量化中台整体路线图
Context
已完成 Phase 1-6(Lean回测 + 模拟交易 + Dashboard)。现在要从"单机量化工具"升级为"机构级量化中台"。
用户架构包含6层:数据层(L1-L4) → 数据加工层 → AI核心策略层 → 验证层 → 执行层 → 风控层。
当前资源: 8核CPU, 23GB RAM, 104GB磁盘可用, Redis + MongoDB 已有, 有多种付费数据源。 优先级: 数据层 + 加工层先行,暂不接券商。 使用场景: 机构内部系统。
当前完成度
| 架构层 | 完成度 | 说明 |
|---|---|---|
| 数据层 (L1-L4+历史) | 5% | 只有 yfinance (基础L1), zip文件存储 |
| 数据加工层 | 5% | 无pipeline, 无清洗标准化 |
| AI核心策略层 | 15% | 简单技术指标 + LLM情绪信号 |
| 验证层 (A/B) | 20% | 有回测 + 模拟, 无A/B框架 |
| 智能执行层 | 5% | 模拟交易, 无券商接入 |
| 风控管理层 | 10% | 基础风控(止损、仓位限制) |
阶段总览
阶段 A: 数据基础设施 ← 现在开始(本次实施)
阶段 B: 数据加工 + 因子化 ← 紧接着
阶段 C: AI 核心策略层
阶段 D: 执行层 + 风控层(开券商账户后)
阶段 A:数据基础设施(本次实施重点)
A1. TimescaleDB 时序数据库
安装 PostgreSQL + TimescaleDB 扩展,替代 zip 文件存储。
核心表:
-- L1 行情
CREATE TABLE market_ohlcv (
time TIMESTAMPTZ NOT NULL, symbol TEXT, source TEXT,
timeframe TEXT, open FLOAT8, high FLOAT8, low FLOAT8,
close FLOAT8, volume BIGINT, vwap FLOAT8
);
SELECT create_hypertable('market_ohlcv', 'time');
-- L2 Flow
CREATE TABLE flow_data (
time TIMESTAMPTZ NOT NULL, symbol TEXT, source TEXT,
flow_type TEXT, side TEXT, premium FLOAT8, volume BIGINT,
strike FLOAT8, expiry DATE, raw_data JSONB
);
-- L3 Info
CREATE TABLE info_data (
time TIMESTAMPTZ NOT NULL, source TEXT, category TEXT,
symbol TEXT, headline TEXT, sentiment FLOAT8,
confidence FLOAT8, raw_data JSONB
);
-- L4 另类
CREATE TABLE alt_data (
time TIMESTAMPTZ NOT NULL, source TEXT, category TEXT,
symbol TEXT, signal_value FLOAT8, metadata JSONB
);
-- 数据源注册
CREATE TABLE data_sources (
id SERIAL PRIMARY KEY, name TEXT UNIQUE, layer TEXT,
api_type TEXT, config JSONB, enabled BOOLEAN DEFAULT true,
last_sync TIMESTAMPTZ, status TEXT DEFAULT 'idle'
);
文件:
- src/data/__init__.py
- src/data/database.py — SQLAlchemy async 连接池 + 表定义
- src/data/models.py — Pydantic 模型(请求/响应)
- scripts/init_db.py — 初始化 TimescaleDB + 建表
- scripts/migrate_yfinance.py — 迁移现有 9个symbol 日线到数据库
A2. Connector 数据采集框架
统一的数据源接口,每个源一个 Connector。
文件:
- src/data/connectors/__init__.py
- src/data/connectors/base.py — BaseConnector 抽象类(fetch/stream/validate/transform)
- src/data/connectors/yfinance_conn.py — 重构现有 yfinance 逻辑
- 后续按需添加:unusual_whales.py, flowalgo.py, polymarket.py 等
BaseConnector 接口:
class BaseConnector(ABC):
source_name: str
layer: str # L1/L2/L3/L4
async def fetch(self, symbols, start, end) -> list[dict]
async def stream(self, symbols) -> AsyncIterator[dict] # 实时流(可选)
def transform(self, raw) -> list[dict] # 标准化为统一格式
A3. 数据调度器
src/data/scheduler.py— DataScheduler(APScheduler)- 定时策略:L1日线收盘后同步,L2/L3交易时段轮询,L4按源特性
A4. 数据质量监控
src/data/quality.py— 缺失率、异常值、延迟、完整性检查
A5. 数据查询 API
src/api/data_routes.py
GET /api/v1/data/ohlcv — 查询行情(symbol, start, end, timeframe)
GET /api/v1/data/flow — 查询Flow数据
GET /api/v1/data/sources — 数据源列表及状态
POST /api/v1/data/sync/{source} — 手动触发同步
GET /api/v1/data/quality — 数据质量报告
A6. Dashboard 数据管理页 (/data)
- 数据源状态卡片(名称、层级、最后同步、记录数、状态灯)
- 数据覆盖热力图(symbol × 日期)
- 手动同步按钮
- 数据质量指标面板
文件:
- dashboard/src/pages/DataManager.jsx
- dashboard/src/api/client.js — 增加 data API 函数
阶段 B:数据加工 + 因子化(A 完成后)
B1. 清洗管道
src/processing/pipeline.py— 去噪、去重、缺失补齐、时间对齐、币种统一
B2. 因子引擎
src/processing/factors/base.py— BaseFactor 抽象类src/processing/factors/technical.py— EMA、RSI、MACD、BB、动量src/processing/factors/flow.py— 异常扫单评分、暗池活跃度src/processing/factors/sentiment.py— 新闻/社媒/LLM情绪因子src/processing/factors/cross_asset.py— 跨资产相关性、板块动量src/processing/factor_store.py— 因子存储(TimescaleDBfactors表)
B3. 因子评估
src/processing/factor_eval.py— IC、衰减分析、相关性矩阵
阶段 C:AI 核心策略层(B 完成后)
C1. 异常检测 — Abnormal Score
src/core/abnormal_detector.py- 输入:期权溢价异常、Volume/OI突变、Golden Sweep、Gamma Squeeze、Dark Pool
- 输出:异常评分 (0-100) + 类型标签
C2. 趋势评分 — Trend Score
src/core/trend_scorer.py- 输入:价格动量、信息优势、情绪趋势、跨资产确认
- 输出:趋势分 (-100 ~ +100) + 方向 + 置信度
C3. AI 策略引擎 — Predicted P&L
src/core/strategy_engine.pysrc/core/portfolio_optimizer.py- 综合异常分+趋势分 → 策略候选 → 组合优化 → 预测收益/风险
C4. A/B 验证框架
src/core/ab_testing.py- 策略候选 → 模拟执行 → 显著性检验 → 参数迭代
阶段 D:执行 + 风控(开券商账户后)
- IB API 接入
- SOR 智能路由
- TCA 成本分析
- 完整风控(头寸/保证金/杠杆/流动性)
阶段 A 实施顺序
第一步 A1: 安装 TimescaleDB → 建表 → 迁移 yfinance 数据 第二步 A2: Connector 框架 + yfinance Connector 重构 第三步 A5: 数据查询 API 端点 第四步 A6: Dashboard 数据管理页 第五步 A3: 数据调度器(定时采集) 第六步 A4: 数据质量监控
新增依赖
sqlalchemy[asyncio]>=2.0
asyncpg>=0.29
psycopg2-binary>=2.9
apscheduler>=3.10
验证(阶段 A 完成后)
psql查询SELECT count(*) FROM market_ohlcv WHERE symbol='SPY'→ ~7000行curl /api/v1/data/ohlcv?symbol=SPY&start=2025-01-01→ 返回 JSONcurl /api/v1/data/sources→ 列出数据源及状态- Dashboard
/data页 → 数据源卡片 + 覆盖热力图 - 手动触发同步 → 数据入库 → 质量检查通过
修改的现有文件
main.py — 增加 data_routes, DataScheduler 初始化
config.py — 增加 DATABASE_URL 配置
requirements.txt — 增加数据库依赖
dashboard/src/App.jsx — 增加 /data 路由
dashboard/src/components/Sidebar.jsx — 增加 Data 导航
dashboard/src/api/client.js — 增加 data API 函数